home *** CD-ROM | disk | FTP | other *** search
/ Language/OS - Multiplatform Resource Library / LANGUAGE OS.iso / sml_nj / cml-098.lha / cml-0.9.8 / src / cml-base.sml < prev    next >
Encoding:
Text File  |  1993-02-19  |  11.8 KB  |  386 lines

  1. (* cml-base.sml
  2.  *)
  3.  
  4. structure CMLBase : CML_BASE =
  5.   struct
  6.  
  7.     open CMLVersion (* for version and versionName *)
  8.  
  9.   (* some utility functions that should be inlined *)
  10.     fun reverse ([], rl) = rl
  11.       | reverse (x :: rest, rl) = reverse(rest, x :: rl)
  12.  
  13.   (* queues *)
  14.     datatype 'a queue_t = Q of {front : 'a list ref, rear : 'a list ref}
  15.  
  16.   (* create a new queue *)
  17.     fun queueNew () = Q{front = ref [], rear = ref []}
  18.  
  19.   (* queue insert *)
  20.     fun queueInsc (Q{rear, ...}) = fn x => (rear := x :: !rear)
  21.  
  22.   (* Per-thread descriptors *)
  23.     datatype thread_id = TID of {  (* thread ids *)
  24.     id       : int,
  25.     done_comm  : bool ref,        (* set this whenever this thread does *)
  26.                     (* some concurrency operation. *)
  27.     death_cond : unit cond_var
  28.       }
  29.   (* condition variables *)
  30.     and 'a cond_var = COND of 'a cond_state ref
  31.     and 'a cond_state
  32.       = COND_unset of (thread_id * bool ref * 'a cont) list
  33.       | COND_set of 'a
  34.  
  35.     val dummyTId = TID{
  36.         id = ~1, done_comm = ref false, death_cond = COND(ref(COND_set()))
  37.       }
  38.  
  39.   (* report an internal error on std_err *)
  40.     fun reportError msg = let
  41.       val s = if (ord(msg) = ord("\n"))
  42.           then implode["\nCML: ", substring(msg, 1, (size msg)-1), "\n"]
  43.         else implode["CML: ", msg, "\n"]
  44.       in
  45.         System.Signals.maskSignals true;
  46.         IO.output(IO.std_err, s);
  47.         System.Signals.maskSignals false
  48.       end
  49.  
  50.     exception InternalError
  51.     fun error s = (reportError("\nINTERNAL ERROR: "^s); raise InternalError)
  52.  
  53.  
  54.   (* the termination function *)
  55.     val shutdown = ref (fn () => ())
  56.  
  57.  
  58.   (* timers *)
  59.     structure T : sig
  60.         datatype time = TIME of {sec : int, usec : int}
  61.     val earlier : time * time -> bool
  62.     val add_time : time * time -> time
  63.     val zeroTime : time
  64.         val currentTime : unit -> time
  65.     val timerOff : unit -> unit
  66.     val timerOn : time option -> unit
  67.     val restartTimer : unit -> unit
  68.       end = struct
  69.         open System.Timer System.Unsafe.CInterface
  70.     val zeroTime = TIME{sec=0, usec=0}
  71.     val tod : unit -> (int * int) =
  72.           wrap_sysfn "timeofday" (c_function "timeofday")
  73.     fun currentTime () = let
  74.           val (s, u) = tod()
  75.           in
  76.         TIME{sec = s, usec = u}
  77.           end
  78.         val saveTime = ref (NONE : time option)
  79.     fun timerOff () = setitimer (0, zeroTime, zeroTime)
  80.     fun timerOn t = (
  81.           saveTime := t;
  82.           case t of (SOME tq) => setitimer (0, tq, tq) | _ => ())
  83.     fun restartTimer () = 
  84.           case !saveTime of (SOME tq) => setitimer (0, tq, tq) | _ => ()
  85.       end
  86.     open T
  87.  
  88.  
  89.   (* thread id marking *)
  90.     fun markTid (TID{done_comm, ...}) = done_comm := true
  91.     fun unmarkTid (TID{done_comm, ...}) = done_comm := false
  92.     fun isMarked (TID{done_comm, ...}) = !done_comm
  93.  
  94.   (* the current thread is represented using the "var" register *)
  95.     val getCurThread : unit -> thread_id = System.Unsafe.getvar
  96.     val setCurThread : thread_id -> unit = System.Unsafe.setvar
  97.  
  98.   (* The thread ready queues:
  99.    * rdyQ1 is the primary queue and rdyQ2 is the secondary queue.
  100.    *)
  101.     val rdyQ1 : (thread_id * unit cont) queue_t = queueNew()
  102.     val rdyQ2 : (thread_id * unit cont) queue_t = queueNew()
  103.  
  104.   (* enqueue a ready thread *)
  105.     val enqueue1 = queueInsc rdyQ1
  106.     val enqueue2 = queueInsc rdyQ2
  107.     fun enqueue (p as (id, _)) = (markTid id; enqueue1 p)
  108.  
  109.   (* enqueue the current thread *)
  110.     fun enqueueCurThread resume = (enqueue(getCurThread(), resume))
  111.  
  112.   (* promote a thread from the secondary queue to the primary queue *)
  113.     fun promote () = (case rdyQ2
  114.        of Q{front=ref [], rear=ref []} => ()
  115.         | Q{front=front as (ref []), rear} => let
  116.         val (x::r) = reverse (!rear, [])
  117.         in
  118.           front := r; rear := []; enqueue1 x
  119.         end
  120.         | Q{front=front as ref (x::r), rear} => (front := r; enqueue1 x)
  121.       (* end case *))
  122.  
  123.   (* preempt the current thread (with continuation k). *)
  124.     fun preempt k = let
  125.       val curTid = getCurThread()
  126.       val curP = (curTid, k)
  127.       in
  128.         if (isMarked curTid)
  129.           then (
  130.         unmarkTid curTid;
  131.         promote ();
  132.         enqueue1 curP)
  133.           else enqueue2 curP
  134.       end
  135.  
  136.  
  137.   (** I/O wait queues **)
  138.     fun pollFDs (rdfds, wrfds, exfds, blocking) = let
  139.       val t = if blocking then NONE else (SOME zeroTime)
  140.       val (rd, wr, ex) = System.Unsafe.SysIO.select(rdfds, wrfds, exfds, t)
  141.       in
  142.         (rd, wr, ex)
  143.       end
  144.     (* The list of I/O wait events *)
  145.       datatype io_operation_t = IO_RD | IO_WR | IO_EX
  146.       type io_item = {
  147.       fd       : int,        (* the file descriptor *)
  148.       io_op       : io_operation_t,    (* the operation being waited for *)
  149.       kont       : unit cont,        (* the synchronization continuation and *)
  150.       id       : thread_id,        (* the id of the waiting thread *)
  151.       err_kont : unit cont,        (* the error continuation of the thread *)
  152.       dirty       : bool ref        (* the dirty bit *)
  153.     }
  154.       val ioWaitList = ref ([] : io_item list)
  155.  
  156.     (* project the different kinds of I/O operations in the I/O wait list. *)
  157.       fun projIO () = let
  158.         fun f ([] : io_item list, rd, wr, ex) = (rd, wr, ex)
  159.           | f ({dirty = ref true, ...}::r, rd, wr, ex) = f(r, rd, wr, ex)
  160.           | f ({io_op = IO_RD, fd, ...}::r, rd, wr, ex) = f(r, fd::rd, wr, ex)
  161.           | f ({io_op = IO_WR, fd, ...}::r, rd, wr, ex) = f(r, rd, fd::wr, ex)
  162.           | f ({io_op = IO_EX, fd, ...}::r, rd, wr, ex) = f(r, rd, wr, fd::ex)
  163.         in
  164.           f(!ioWaitList, [], [], [])
  165.         end
  166.  
  167.     (* check for available I/O operations *)
  168.       fun checkIO shouldBlock = (case projIO()
  169.          of ([], [], []) => ()
  170.           | (rd, wr, ex) => (case pollFDs(rd, wr, ex, shouldBlock)
  171.            of ([], [], []) => ()
  172.             | (rd, wr, ex) => let
  173.             fun f ([], l) = l
  174.               | f (({dirty = ref true, ...} : io_item)::r, l) = f (r, l)
  175.               | f ((x as {io_op, fd, kont, id, dirty, ...})::r, l) = let
  176.                   fun look [] = false
  177.                 | look (x::r) = if (x = fd)
  178.                     then (
  179.                       enqueue(id, kont);
  180.                       dirty := true;
  181.                       true)
  182.                     else (look r)
  183.                   val fdList = (case io_op
  184.                      of IO_RD => rd
  185.                       | IO_WR => wr
  186.                       | IO_EX => ex)
  187.                   in
  188.                 if (look fdList) then f(r, l) else f(r, x::l)
  189.                   end
  190.             in
  191.               ioWaitList := f(!ioWaitList, [])
  192.             end
  193.           (* end case *))
  194.         (* end case *))
  195.         handle (System.Unsafe.CInterface.SystemCall _) => let
  196.           open System.Unsafe.SysIO
  197.           fun testDesc fd = (ftype(DESC fd); false) handle _ => true
  198.           fun findBadDescs ([], l) = l
  199.         | findBadDescs ((x as {fd, dirty, err_kont, id, ...} : io_item)::r, l) =
  200.             if (testDesc fd)
  201.               then (
  202.             enqueue(id, err_kont);
  203.             dirty := true;
  204.             findBadDescs (r, l))
  205.               else findBadDescs (r, x::l)
  206.           in
  207.         ioWaitList := findBadDescs(!ioWaitList, []);
  208.         checkIO shouldBlock
  209.           end
  210.  
  211.     (* insert an I/O operation into the I/O waiting list *)
  212.       fun insIOWait info = ioWaitList := info :: !ioWaitList
  213.  
  214.     (* return true if there is at least one clean I/O wait event on the list *)
  215.       fun waitingForIO () = let
  216.         fun f (l as (({dirty = ref true, ...}::r)) : io_item list) = (f r)
  217.           | f l = l
  218.         in
  219.           case f(!ioWaitList)
  220.            of [] => (ioWaitList := []; false)
  221.         | l => (ioWaitList := l; true)
  222.         end
  223.  
  224.  
  225.   (** Timer waiting queues **)
  226.     datatype time_wait_t = TIMEWAIT of {
  227.     wait_time : time,
  228.     id : thread_id,
  229.     kont : unit cont,
  230.     dirty : bool ref
  231.       }
  232.     val timeWaitList = ref ([] : time_wait_t list)
  233.  
  234.   (* insert a timeout event *)
  235.     fun insTimeWait (tim, id, k, flg) = let
  236.       val item = TIMEWAIT{wait_time=tim, id=id, kont=k, dirty=flg}
  237.       fun scan [] = [item]
  238.         | scan ((t as TIMEWAIT{dirty = ref true, ...})::r) = scan r
  239.         | scan (l as ((t as TIMEWAIT{wait_time, ...})::r)) =
  240.         if (earlier (tim, wait_time))
  241.           then (item::l)
  242.           else (t::(scan r))
  243.       in
  244.         timeWaitList := scan(!timeWaitList)
  245.       end
  246.  
  247.   (* schedule any threads waiting for times earlier than the current time. *)
  248.     fun remTimeWait () = let
  249.       val tim = currentTime()
  250.       fun scan [] = []
  251.         | scan (l as ((t as TIMEWAIT{dirty = ref true, ...})::r)) = scan r
  252.         | scan (l as ((t as TIMEWAIT{wait_time, id, kont, dirty})::r)) =
  253.         if earlier(tim, wait_time)
  254.           then l
  255.           else (enqueue(id, kont); dirty := true; scan r)
  256.       in
  257.         timeWaitList := scan(!timeWaitList)
  258.       end
  259.  
  260.   (* return true if there is at least one clean timeout event on the list *)
  261.     fun waitingForTimeout () = let
  262.       fun f (TIMEWAIT{dirty = ref true, ...}::r) = (f r)
  263.         | f l = l
  264.       in
  265.         case (f (!timeWaitList))
  266.          of [] => (timeWaitList := []; false)
  267.           | l => (timeWaitList := l; true)
  268.       end
  269.     structure Sig = System.Signals
  270.  
  271.  
  272.   (*  test for blocked threads that could conceivably become unblocked *)
  273.     fun checkWaitingThreads () = (
  274.      case (!ioWaitList) of [] => () | _=> (checkIO false);
  275.      case (!timeWaitList) of [] => () | _=> remTimeWait ())
  276.  
  277.   (* global flag for implementing atomic operations *)
  278.     datatype atomic_state = NonAtomic | Atomic | SignalPending
  279.     val atomicState = ref NonAtomic
  280.  
  281.   (* remove a thread from the primary queue.  *)
  282.     fun dequeue1 () = (case rdyQ1
  283.        of (Q{front = ref [], rear = ref []}) => dequeue2()
  284.         | (Q{front = front as (ref []), rear = rear as (ref l)}) => let
  285.         val (x::r) = reverse(l, [])
  286.         in
  287.           front := r; rear := []; x
  288.         end
  289.         | (Q{front = front as (ref(x::r)), ...}) => (front := r; x)
  290.       (* end case *))
  291.   (* remove a thread from the secondary queue (assuming that the
  292.    * primary queue is empty.
  293.    *)
  294.     and dequeue2 () = let
  295.     (* wait for I/O or delay when there are no ready threads. *)
  296.       fun waitForSomething () = (case (waitingForIO(), waitingForTimeout())
  297.            of (false, false) => (!shutdown)()
  298.         | (_, false) => (timerOff(); checkIO true; restartTimer())
  299.         | _ => (System.Signals.pause(); checkWaitingThreads()))
  300.       fun dequeue2 () = (case rdyQ2
  301.            of (Q{front = ref [], rear = ref []}) => (waitForSomething(); dequeue1())
  302.         | (Q{front = front as (ref []), rear = rear as (ref l)}) => let
  303.             val (x::r) = reverse(l, [])
  304.             in
  305.               front := r; rear := []; x
  306.             end
  307.         | (Q{front = front as (ref(x::r)), ...}) => (front := r; x))
  308.       in
  309.         dequeue2()
  310.       end (* dequeue2 *)
  311.  
  312.     fun atomicDispatch () = let
  313.       val _ = (case !atomicState
  314.          of SignalPending => checkWaitingThreads()
  315.           | _ => ()
  316.         (* end case *))
  317.       val (id, kont) = dequeue1()
  318.       in
  319.         setCurThread id;
  320.         atomicState := NonAtomic;
  321.         throw kont ()
  322.       end
  323.  
  324.   (* Complete the exit from an atomic region when there is a pending signal. *)
  325.     fun handlePendingSignal () =
  326.       callcc (fn k => (preempt k; atomicDispatch()))
  327.  
  328.   (* initialize the atomic region support *)
  329.     fun initAtomic () = let
  330.       val checkIOKont = callcc (fn k1 => (
  331.         callcc (fn k2 => (throw k1 k2));
  332.           (* NOTE: this continuation always starts in an atomic region
  333.            * with atomicState = SignalPending.
  334.            *)
  335.         atomicDispatch()))
  336.       fun alrm_handler (_, k) = (case !atomicState
  337.          of NonAtomic => (
  338.               preempt k;
  339.             (* We set the atomicState to SignalPending to force a
  340.              * check for I/O and timers in checkIOKont.
  341.              *)
  342.               atomicState := SignalPending;
  343.               checkIOKont)
  344.           | _ => (atomicState := SignalPending;  k)
  345.         (* end case *))
  346.       in
  347.         Sig.setHandler (Sig.SIGALRM, SOME alrm_handler);
  348.         atomicState := NonAtomic
  349.       end
  350.  
  351.   (* return the # of threads created *)
  352.     fun load () = let
  353.       val _ = (atomicState := Atomic)
  354.       fun count (Q{front, rear}) = List.length(!front) + List.length(!rear)
  355.       val res = (count rdyQ1 + count rdyQ2)
  356.       in
  357.         case !atomicState
  358.          of SignalPending => handlePendingSignal()
  359.           | _ => atomicState := NonAtomic
  360.         (* end case *);
  361.         res
  362.       end
  363.  
  364.     fun initCMLBase () = let
  365.       fun emptyQ (Q{front, rear}) = (front := []; rear := [])
  366.       in
  367.         emptyQ rdyQ1; emptyQ rdyQ2;
  368.         ioWaitList := [];
  369.         timeWaitList := [];
  370.         initAtomic()
  371.       end
  372.  
  373.     val running = ref false
  374.  
  375.   (* mark the beginning of CML execution and start the timer *)
  376.     fun go tq = (running := true; timerOn tq)
  377.  
  378.   (* turn the timer off, and mark the end of CML execution *)
  379.     fun stop () = (timerOff(); running := false)
  380.  
  381.   (* returns true if CML is running *)
  382.     fun isRunning () = !running
  383.  
  384.   end; (* CMLBase *)
  385.  
  386.